Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes a foundational cron job and task scheduling service within the application. It enables the definition of various background tasks, provides mechanisms to schedule them using AWS EventBridge Scheduler, and processes them asynchronously via an AWS SQS-backed worker. This system is crucial for automating recurring operational tasks, such as the newly introduced cleanup of instances belonging to canceled subscriptions, thereby improving resource management and system hygiene. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
PR #229 Review: feat: implement cronjob service and schedulerOverall the architecture is solid — clean separation between services/jobs (domain types), api/tasks (AWS adapters), and the task_worker binary. Good test coverage and safe defaults (disabled by default). A few critical issues need addressing before merge. CRITICAL1. Privacy violation — NoopTaskPayload.note logged at INFO
2. SQS message never deleted on parse failure — infinite redelivery loop In 3. list_instances hard-coded to 1000 — silently skips instances beyond that limit
4. API server startup blocked by AWS scheduler — startup fails if AWS is unavailable
5. Transient SQS errors kill the worker process
MINOR6. ConflictException detection via string matching is fragile
7. upsert_task duplicates create_task_if_absent logic Both methods share near-identical create + conflict-check + update code. Extract a shared helper to prevent future drift. 8. rust-toolchain.toml bump is a separate concern The toolchain bump from |
There was a problem hiding this comment.
Code Review
This pull request introduces a new task worker system, integrating with AWS SQS and EventBridge Scheduler for managing and executing background tasks. Key changes include adding AWS SDK dependencies, defining task structures and executor/scheduler traits, and implementing an AWS-specific task scheduler and SQS worker. A new binary task_worker.rs handles task execution, including a daily cleanup for canceled instances. The API server is updated to ensure the daily cleanup task is scheduled on startup. Review feedback suggests improving performance in the cleanup task by processing instance deletions concurrently and enhancing error handling robustness in the AWS scheduler by matching specific SDK error types instead of string comparisons.
| for instance in cleanup_targets.drain(..) { | ||
| if let Err(err) = self.agent_service.delete_instance(instance.id).await { | ||
| failed_instances += 1; | ||
| tracing::error!( | ||
| "cleanup task: delete failed instance_id={} user_id={} status={} err={}", | ||
| instance.id, | ||
| user_id, | ||
| instance.status, | ||
| err | ||
| ); | ||
| } else { | ||
| tracing::info!( | ||
| "cleanup task: deleted instance_id={} user_id={} previous_status={}", | ||
| instance.id, | ||
| user_id, | ||
| instance.status | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation deletes instances for a user sequentially. If a user has many instances, this could be slow. You can improve performance by deleting them concurrently using futures::stream.
You'll need to add use futures::StreamExt; to the top of the file.
let results = futures::stream::iter(cleanup_targets.into_iter())
.map(|instance| {
let agent_service = Arc::clone(&self.agent_service);
async move {
let res = agent_service.delete_instance(instance.id).await;
(res, instance)
}
})
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
for (result, instance) in results {
match result {
Ok(_) => {
tracing::info!(
"cleanup task: deleted instance_id={} user_id={} previous_status={}",
instance.id,
user_id,
instance.status
);
}
Err(err) => {
failed_instances += 1;
tracing::error!(
"cleanup task: delete failed instance_id={} user_id={} status={} err={}",
instance.id,
user_id,
instance.status,
err
);
}
}
}There was a problem hiding this comment.
I agree this is a performance concern, but not a correctness issue. Given our current instance limits (5 max per user), it’s unlikely to materially affect normal production users
crates/api/src/tasks/mod.rs
Outdated
| match create.send().await { | ||
| Ok(_) => Ok(()), | ||
| Err(err) => { | ||
| let err_text = err.to_string(); | ||
| if err_text.contains("ConflictException") || err_text.contains("already exists") { | ||
| let mut update = self | ||
| .client | ||
| .update_schedule() | ||
| .name(request.task_id.as_str()) | ||
| .group_name(&self.scheduler_group) | ||
| .schedule_expression(&expression) | ||
| .flexible_time_window(flex_window) | ||
| .target(target); | ||
|
|
||
| if matches!(request.schedule, ScheduleSpec::At(_)) { | ||
| update = update.action_after_completion(ActionAfterCompletion::Delete); | ||
| } | ||
|
|
||
| update.send().await.map_err(|e| { | ||
| anyhow!( | ||
| "failed to update existing schedule for task_id={}: {:?}", | ||
| request.task_id, | ||
| e | ||
| ) | ||
| })?; | ||
| Ok(()) | ||
| } else { | ||
| Err(anyhow!( | ||
| "failed to create schedule for task_id={}: {:?}", | ||
| request.task_id, | ||
| err | ||
| )) | ||
| } | ||
| } |
There was a problem hiding this comment.
Instead of matching on the string representation of the error, it's more robust to match on the specific error type provided by the AWS SDK. This avoids potential breakages if the error message string changes in a future SDK version.
You can use aws_sdk_scheduler::error::SdkError::ServiceError and match on the CreateScheduleError::ConflictException variant. This same pattern should also be applied to the create_task_if_absent function.
You'll need to add the following imports at the top of the file:
use aws_sdk_scheduler::error::SdkError;
use aws_sdk_scheduler::operation::create_schedule::CreateScheduleError; match create.send().await {
Ok(_) => Ok(()),
Err(SdkError::ServiceError(service_error))
if matches!(service_error.err(), CreateScheduleError::ConflictException(_)) =>
{
let mut update = self
.client
.update_schedule()
.name(request.task_id.as_str())
.group_name(&self.scheduler_group)
.schedule_expression(&expression)
.flexible_time_window(flex_window)
.target(target);
if matches!(request.schedule, ScheduleSpec::At(_)) {
update = update.action_after_completion(ActionAfterCompletion::Delete);
}
update.send().await.map_err(|e| {
anyhow!(
"failed to update existing schedule for task_id={}: {:?}",
request.task_id,
e
)
})?;
Ok(())
}
Err(err) => Err(anyhow!(
"failed to create schedule for task_id={}: {:?}",
request.task_id,
err
)),
}There was a problem hiding this comment.
Pull request overview
Implements a basic background task system using AWS EventBridge Scheduler + SQS, including shared task definitions in services, API-side schedule creation on startup, and a dedicated SQS worker binary to execute tasks.
Changes:
- Added
services::jobstask primitives (IDs, schedules, payloads) plus dispatching helpers. - Added AWS scheduler + SQS worker plumbing in the API crate and a
task_workerbinary. - Introduced
TaskConfigand env example entries to configure scheduler/worker behavior.
Reviewed changes
Copilot reviewed 9 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| rust-toolchain.toml | Bumps Rust toolchain version. |
| env.example | Documents new scheduler/worker env vars. |
| crates/services/src/lib.rs | Exposes new jobs module. |
| crates/services/src/jobs/mod.rs | Defines task types, scheduling spec, and dispatching. |
| crates/config/src/lib.rs | Adds TaskConfig with env-driven defaults and tests. |
| crates/api/src/tasks/mod.rs | Implements AWS Scheduler integration and SQS worker loop helpers. |
| crates/api/src/main.rs | Ensures daily cleanup schedule at API startup. |
| crates/api/src/lib.rs | Exposes new tasks module. |
| crates/api/src/bin/task_worker.rs | Adds worker executable to process SQS task messages and run cleanup logic. |
| crates/api/Cargo.toml | Adds AWS SDK dependencies. |
| Cargo.lock | Locks AWS SDK + transitive dependency updates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
crates/services/src/jobs/mod.rs
Outdated
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] | ||
| #[serde(transparent)] | ||
| pub struct TaskId(String); | ||
|
|
There was a problem hiding this comment.
TaskId has a new() validator, but deriving Deserialize with #[serde(transparent)] allows any string to deserialize into TaskId without validation. Consider enforcing validation during deserialization (e.g., #[serde(try_from = "String")] + TryFrom<String> calling TaskId::new), or validating in the SQS message parser.
crates/api/src/tasks/mod.rs
Outdated
| Err(err) => { | ||
| let err_text = err.to_string(); | ||
| if err_text.contains("ConflictException") || err_text.contains("already exists") { | ||
| tracing::info!( | ||
| "schedule already exists, skipping create task_id={} group={}", |
There was a problem hiding this comment.
create_task_if_absent detects an existing schedule by substring-matching err.to_string() ("ConflictException"/"already exists"). This is brittle across SDK versions and can misclassify errors. Prefer matching on the typed AWS SDK error variants for conflict vs. other failures.
crates/api/src/tasks/mod.rs
Outdated
| Err(err) => { | ||
| let err_text = err.to_string(); | ||
| if err_text.contains("ConflictException") || err_text.contains("already exists") { | ||
| let mut update = self | ||
| .client |
There was a problem hiding this comment.
upsert_task also relies on substring-matching err.to_string() to detect create conflicts. Consider switching to matching on the concrete AWS SDK error variants (e.g., conflict) so schedule updates don’t depend on error text formatting.
crates/api/src/tasks/mod.rs
Outdated
| .wait_time_seconds(self.wait_seconds.clamp(1, 20)) | ||
| .visibility_timeout(self.visibility_timeout.max(1)) | ||
| .send() | ||
| .await | ||
| .context("failed to receive SQS messages")?; |
There was a problem hiding this comment.
In run_forever, a transient receive_message failure will currently return an error (and terminate the worker) because the call is ?-propagated. For a long-running worker, consider catching/logging the error and retrying with a delay/backoff instead of exiting the loop.
crates/api/src/tasks/mod.rs
Outdated
| if let Some(receipt_handle) = message.receipt_handle() { | ||
| client | ||
| .delete_message() | ||
| .queue_url(queue_url) | ||
| .receipt_handle(receipt_handle) | ||
| .send() | ||
| .await | ||
| .context("failed to delete SQS message")?; | ||
| } | ||
|
|
There was a problem hiding this comment.
If receipt_handle is missing, the code currently skips deletion and still returns Ok(()), which will cause the message to be redelivered and reprocessed indefinitely. Consider treating a missing receipt_handle as an error (or at least logging and returning an error) so it’s visible and doesn’t silently loop.
| if let Some(receipt_handle) = message.receipt_handle() { | |
| client | |
| .delete_message() | |
| .queue_url(queue_url) | |
| .receipt_handle(receipt_handle) | |
| .send() | |
| .await | |
| .context("failed to delete SQS message")?; | |
| } | |
| let receipt_handle = message | |
| .receipt_handle() | |
| .ok_or_else(|| anyhow!("SQS message missing receipt_handle"))?; | |
| client | |
| .delete_message() | |
| .queue_url(queue_url) | |
| .receipt_handle(receipt_handle) | |
| .send() | |
| .await | |
| .context("failed to delete SQS message")?; |
There was a problem hiding this comment.
I’m not changing this one in this PR. If receipt_handle is missing, we can’t delete the message either way, so returning an error instead of Ok(()) doesn’t actually change the redelivery behavior. In normal SQS ReceiveMessage responses a receipt handle is expected, so I’m treating this as an edge-case defensive improvement rather than a priority fix.
|
@claude other issues are fixed, please review again |
|
Claude finished @danielwpz's task in 58s —— View job Re-review of PR #229Tasks
All four previously-blocking issues have been addressed. Here's the status: Previously Critical — Now Fixed ✅1. Privacy violation — 2. SQS poison-message infinite loop on parse failure 4. API server startup blocked by AWS scheduler 5. Transient SQS errors kill the worker process Issue 3 — Pagination gap at 1,000 instances (deferred ✓)Acknowledged and accepted as a follow-up. The 1,000-instance hard cap ( Remaining Minor IssuesThese were flagged as minor in the first review and remain unaddressed: 6. ConflictException detected via string matching ( 7. 8. 9. Missing Overall: The PR is in good shape. All critical blockers from the previous review are resolved. The remaining items are minor polish. Ready to merge once you've decided whether items 6–9 are worth a follow-up or addressing inline. |
|
Fixed the duplicate scheduler create/setup logic by pulling the shared request/create path into common helpers. I’m keeping the rust-toolchain bump in this PR because it is required by the AWS SDK update in this change set. Also fixed the logging issue, invalid-message handling, startup behavior, and the transient SQS receive retry path in the latest update. |
|
Claude finished @danielwpz's task in 1m 15s —— View job Re-review of PR #229Tasks
All previously flagged issues have been resolved. Here's the complete status: Previously Critical — All Fixed ✅
Previously Minor — All Fixed ✅6. ConflictException now uses typed SDK variants ( 7. Duplicate create/update logic extracted ( Copilot: Intentionally Deferred (Acknowledged)
Overall: All critical and minor issues from previous reviews are resolved. The code is clean, well-tested, and the architectural decisions are sound. This PR is ready to merge. |
|
@gemini-code-assist review |
There was a problem hiding this comment.
Code Review
This pull request introduces a background task system leveraging AWS SQS and EventBridge Scheduler. It adds a new task_worker binary, a task execution framework within the services crate, and a specific job for cleaning up agent instances linked to canceled subscriptions. Additionally, the Rust toolchain is updated to version 1.91.1. Feedback was provided concerning the list_instances call in the cleanup task, which currently lacks pagination and could leave orphaned resources if a user exceeds the 1000-instance limit.
| let (instances, _) = match self.agent_service.list_instances(user_id, 1000, 0).await | ||
| { | ||
| Ok(result) => result, | ||
| Err(err) => { | ||
| tracing::error!( | ||
| "cleanup task: failed to list instances user_id={} err={}", | ||
| user_id, | ||
| err | ||
| ); | ||
| continue; | ||
| } | ||
| }; |
There was a problem hiding this comment.
The current implementation fetches only up to 1000 instances per user due to the hardcoded limit and lack of pagination. If a user has more than 1000 instances, any additional instances will not be cleaned up, leading to orphaned resources. To ensure all instances are processed, you should paginate through the results of list_instances.
let instances = {
let mut all_instances = Vec::new();
let mut offset = 0;
const LIMIT: i32 = 1000;
loop {
match self.agent_service.list_instances(user_id, LIMIT, offset).await {
Ok((batch, total)) => {
let fetched_count = batch.len();
all_instances.extend(batch);
if all_instances.len() >= total as usize
|| fetched_count < LIMIT as usize
{
break Ok(all_instances);
}
offset += fetched_count as i32;
}
Err(err) => {
tracing::error!(
"cleanup task: failed to list instances user_id={} err={}",
user_id,
err
);
break Err(());
}
}
}
};
let instances = match instances {
Ok(i) => i,
Err(_) => continue,
};|
This won’t happen in practice with the current limits. Each user can have at most 5 instances right now, so the 1,000 cap is well outside normal operation. |
|
Thank you for the update, @danielwpz. I have reviewed the latest changes, including the refactored scheduler logic and the addressed minor issues. The implementation of the task scheduling and worker service is now robust and well-structured. Given that the pagination gap is understood and deferred, and the toolchain bump is necessary for the SDK update, this PR is in excellent shape. I have no further concerns. |
No description provided.